package com.cloudansys.core.flink.function;

import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.PayloadParseUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.List;

@Slf4j
public class RMQPayloadParser extends ProcessFunction<String, List<MultiDataEntity>> {

    /**
     * 处理流元素
     * @param payload 原始流元素
     * @param out 流输出收集器
     */
    @Override
    public void processElement(String payload, Context context, Collector<List<MultiDataEntity>> out) throws Exception {
        List<MultiDataEntity> multiDataEntities = PayloadParseUtil.parsePayload(payload);
        if (multiDataEntities != null) {
            out.collect(multiDataEntities);
        } else {
            log.error("原始数据格式有误！【RMQPayloadParser】");
        }
    }

}
